/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.io.gcp.bigtable;



import com.google.bigtable.admin.v2.GetTableRequest;

import com.google.bigtable.v2.*;

import com.google.cloud.bigtable.config.BigtableOptions;

import com.google.cloud.bigtable.grpc.BigtableSession;

import com.google.cloud.bigtable.grpc.BigtableTableName;

import com.google.cloud.bigtable.grpc.async.BulkMutation;

import com.google.cloud.bigtable.grpc.scanner.ResultScanner;

import com.google.protobuf.ByteString;

import io.grpc.Status.Code;

import io.grpc.StatusRuntimeException;

import com.bff.gaia.unified.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;

import com.bff.gaia.unified.sdk.io.range.ByteKeyRange;

import com.bff.gaia.unified.sdk.values.KV;

import com.bff.gaia.unified.vendor.guava.com.google.common.annotations.VisibleForTesting;

import com.bff.gaia.unified.vendor.guava.com.google.common.base.MoreObjects;

import com.bff.gaia.unified.vendor.guava.com.google.common.io.Closer;

import com.bff.gaia.unified.vendor.guava.com.google.common.util.concurrent.FutureCallback;

import com.bff.gaia.unified.vendor.guava.com.google.common.util.concurrent.Futures;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;



import java.io.IOException;

import java.util.List;

import java.util.NoSuchElementException;

import java.util.concurrent.CompletableFuture;

import java.util.concurrent.CompletionStage;



/**

 * An implementation of {@link BigtableService} that actually communicates with the Cloud Bigtable

 * service.

 */

class BigtableServiceImpl implements BigtableService {

  private static final Logger LOG = LoggerFactory.getLogger(BigtableService.class);



  public BigtableServiceImpl(BigtableOptions options) {

    this.options = options;

  }



  private final BigtableOptions options;



  @Override

  public BigtableOptions getBigtableOptions() {

    return options;

  }



  @Override

  public BigtableWriterImpl openForWriting(String tableId) throws IOException {

    BigtableSession session = new BigtableSession(options);

    BigtableTableName tableName = options.getInstanceName().toTableName(tableId);

    return new BigtableWriterImpl(session, tableName);

  }



  @Override

  public boolean tableExists(String tableId) throws IOException {

    try (BigtableSession session = new BigtableSession(options)) {

      GetTableRequest getTable =

          GetTableRequest.newBuilder()

              .setName(options.getInstanceName().toTableNameStr(tableId))

              .build();

      session.getTableAdminClient().getTable(getTable);

      return true;

    } catch (StatusRuntimeException e) {

      if (e.getStatus().getCode() == Code.NOT_FOUND) {

        return false;

      }

      String message =

          String.format(

              "Error checking whether table %s (BigtableOptions %s) exists", tableId, options);

      LOG.error(message, e);

      throw new IOException(message, e);

    }

  }



  @VisibleForTesting

  static class BigtableReaderImpl implements Reader {

    private BigtableSession session;

    private final BigtableSource source;

    private ResultScanner<Row> results;

    private Row currentRow;



    @VisibleForTesting

    BigtableReaderImpl(BigtableSession session, BigtableSource source) {

      this.session = session;

      this.source = source;

    }



    @Override

    public boolean start() throws IOException {

      RowSet.Builder rowSetBuilder = RowSet.newBuilder();

      for (ByteKeyRange sourceRange : source.getRanges()) {

        rowSetBuilder =

            rowSetBuilder.addRowRanges(

                RowRange.newBuilder()

                    .setStartKeyClosed(ByteString.copyFrom(sourceRange.getStartKey().getValue()))

                    .setEndKeyOpen(ByteString.copyFrom(sourceRange.getEndKey().getValue())));

      }

      RowSet rowSet = rowSetBuilder.build();



      String tableNameSr =

          session.getOptions().getInstanceName().toTableNameStr(source.getTableId().get());



      ReadRowsRequest.Builder requestB =

          ReadRowsRequest.newBuilder().setRows(rowSet).setTableName(tableNameSr);

      if (source.getRowFilter() != null) {

        requestB.setFilter(source.getRowFilter());

      }

      results = session.getDataClient().readRows(requestB.build());

      return advance();

    }



    @Override

    public boolean advance() throws IOException {

      currentRow = results.next();

      return currentRow != null;

    }



    @Override

    public void close() throws IOException {

      // Goal: by the end of this function, both results and session are null and closed,

      // independent of what errors they throw or prior state.



      if (session == null) {

        // Only possible when previously closed, so we know that results is also null.

        return;

      }



      // Session does not implement Closeable -- it's AutoCloseable. So we can't register it with

      // the Closer, but we can use the Closer to simplify the error handling.

      try (Closer closer = Closer.create()) {

        if (results != null) {

          closer.register(results);

          results = null;

        }



        session.close();

      } finally {

        session = null;

      }

    }



    @Override

    public Row getCurrentRow() throws NoSuchElementException {

      if (currentRow == null) {

        throw new NoSuchElementException();

      }

      return currentRow;

    }

  }



  @VisibleForTesting

  static class BigtableWriterImpl implements Writer {

    private BigtableSession session;

    private BulkMutation bulkMutation;



    BigtableWriterImpl(BigtableSession session, BigtableTableName tableName) {

      this.session = session;

      bulkMutation = session.createBulkMutation(tableName);

    }



    @Override

    public void flush() throws IOException {

      if (bulkMutation != null) {

        try {

          bulkMutation.flush();

        } catch (InterruptedException e) {

          Thread.currentThread().interrupt();

          // We fail since flush() operation was interrupted.

          throw new IOException(e);

        }

      }

    }



    @Override

    public void close() throws IOException {

      try {

        if (bulkMutation != null) {

          try {

            bulkMutation.flush();

          } catch (InterruptedException e) {

            Thread.currentThread().interrupt();

            // We fail since flush() operation was interrupted.

            throw new IOException(e);

          }

          bulkMutation = null;

        }

      } finally {

        if (session != null) {

          session.close();

          session = null;

        }

      }

    }



    @Override

    public CompletionStage<MutateRowResponse> writeRecord(KV<ByteString, Iterable<Mutation>> record)

        throws IOException {

      MutateRowsRequest.Entry request =

          MutateRowsRequest.Entry.newBuilder()

              .setRowKey(record.getKey())

              .addAllMutations(record.getValue())

              .build();



      CompletableFuture<MutateRowResponse> result = new CompletableFuture<>();

      Futures.addCallback(

          new VendoredListenableFutureAdapter<>(bulkMutation.add(request)),

          new FutureCallback<MutateRowResponse>() {

            @Override

            public void onSuccess(MutateRowResponse mutateRowResponse) {

              result.complete(mutateRowResponse);

            }



            @Override

            public void onFailure(Throwable throwable) {

              result.completeExceptionally(throwable);

            }

          });

      return result;

    }

  }



  @Override

  public String toString() {

    return MoreObjects.toStringHelper(BigtableServiceImpl.class).add("options", options).toString();

  }



  @Override

  public Reader createReader(BigtableSource source) throws IOException {

    BigtableSession session = new BigtableSession(options);

    return new BigtableReaderImpl(session, source);

  }



  @Override

  public List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) throws IOException {

    try (BigtableSession session = new BigtableSession(options)) {

      SampleRowKeysRequest request =

          SampleRowKeysRequest.newBuilder()

              .setTableName(options.getInstanceName().toTableNameStr(source.getTableId().get()))

              .build();

      return session.getDataClient().sampleRowKeys(request);

    }

  }

}